/*
 * 百度在线网络技术（北京）有限公司拥有本软件版权2021并保留所有权利。
 * Copyright 2021, Baidu.com,Inc 2:Baidu Online Network Technology (Beijing) Co.,Ltd,
 * All rights reserved.
 */

package com.azdebugit.kafka.future.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.*;

public class ComsumerDemoSync {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "172.16.7.142:9092,172.16.7.143:9092,172.16.7.144:9092");
        configs.put("group.id", "test");
        configs.put("enable.auto.commit", "false");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
//                insertIntoDb(buffer);
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}